package com.ftl.search.common.config;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.util.StringUtils;


import javax.annotation.Resource;

/**
 * 暂时没有特殊的配置需要修改，只使用通用的配置
 */
@Configuration
@Slf4j
public class ElasticsearchConfig {

    @Resource
    private ElasticsearchProperties elasticSearchProperty;

    @Bean
    public RestClientBuilder restClientBuilder() {
        return RestClient.builder(elasticSearchProperty.getUris().stream().map(this::getElasticSearchHttpHosts).toArray(HttpHost[]::new)).
                setRequestConfigCallback(requestConfigBuilder -> {
                    //设置连接超时时间
                    requestConfigBuilder.setConnectTimeout(elasticSearchProperty.getConnectionTimeout().toMillisPart());
                    requestConfigBuilder.setSocketTimeout(elasticSearchProperty.getSocketTimeout().toMillisPart());
                    return requestConfigBuilder;
                }).setFailureListener(new RestClient.FailureListener() {
                    //某节点失败,这里可以做一些告警
                    @Override
                    public void onFailure(Node node) {
                        log.error("[ ElasticSearchClient ] >>  node :{}, host:{},  fail ", node.getName(), node.getHost());
                    }
                }).setHttpClientConfigCallback(httpClientBuilder -> {
                    httpClientBuilder.disableAuthCaching();
                    // 方案二：解决ES持有链接超过服务器keepalive时间的问题
                    // 设置keep-alive时间为1m
                    httpClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> 1000 * 60));
//                    httpClientBuilder.setDefaultIOReactorConfig(
//                            IOReactorConfig
//                                    .custom()
//                                    // 方案一：解决ES长时间不查询tcp中断导致connection reset（未解决）
//                                    .setSoKeepAlive(true)
//                                    .build());
                    //设置账密
                    return getHttpAsyncClientBuilder(httpClientBuilder);
                });
    }


    /**
     * 配置es client
     */
//    @Bean(value = "elasticsearchClient")
    public ElasticsearchClient elasticsearchClient(@Qualifier("restClientBuilder") RestClientBuilder restClientBuilder) {
        RestClient restClient = restClientBuilder.setDefaultHeaders(new Header[]{new BasicHeader("Content-Type", "application/vnd.elasticsearch+json"),}).build();
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());
        return new ElasticsearchClient(transport);
    }

//    @Bean(value = "elasticsearchTemplate")
    public ElasticsearchTemplate elasticsearchTemplate(ElasticsearchClient elasticsearchClient, ElasticsearchConverter elasticsearchConverter) {
        return new ElasticsearchTemplate(elasticsearchClient, elasticsearchConverter);
    }

    private HttpHost getElasticSearchHttpHosts(String host) {
        host = host.replaceAll("http://", "").replaceAll("https://", "");
        return new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http");
    }


    private HttpAsyncClientBuilder getHttpAsyncClientBuilder(HttpAsyncClientBuilder httpClientBuilder) {
        if (StringUtils.isEmpty(elasticSearchProperty.getUsername()) || StringUtils.isEmpty(elasticSearchProperty.getPassword())) {
            return httpClientBuilder;
        }
        //设置账号密码
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticSearchProperty.getUsername(), elasticSearchProperty.getPassword()));
        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        return httpClientBuilder;
    }
}
